from kafka import KafkaConsumer
import json

# Kafka配置
KAFKA_SERVERS = ['localhost:9092']  # 替换为你的Kafka服务器地址
TOPIC4 = 'topic-4'

class KafkaConsumerExample:
    def __init__(self):
        # 初始化消费者
        self.consumer = KafkaConsumer(
            TOPIC4,
            bootstrap_servers=KAFKA_SERVERS,
            auto_offset_reset='earliest',  # 从最早的消息开始读取
            enable_auto_commit=True,
            group_id='topic4-printer-group',  # 消费者组ID
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
    
    def print_messages(self):
        """打印topic4的所有消息"""
        print(f"开始监听并打印 {TOPIC4} 的消息...")
        try:
            for message in self.consumer:
                data = message.value
                print(f"收到消息: {data}")
        except KeyboardInterrupt:
            print("停止消费消息")
        except Exception as e:
            print(f"消费消息出错: {e}")
        finally:
            self.consumer.close()

if __name__ == "__main__":
    consumer = KafkaConsumerExample()
    consumer.print_messages()

